由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。
寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點
繼續走下去的動力。
對我的文章有興趣,歡迎到我的網站上 https://chechiachang.github.io 閱讀其他技術文章
,有任何謬誤也請各方大德直接聯繫我,感激不盡。
先看一看 kafka pods
$ kubectl get pods --selector='app=kafka'
NAME READY STATUS RESTARTS AGE
kafka-1-0 1/1 Running 1 26d
kafka-1-1 1/1 Running 0 26d
kafka-1-2 1/1 Running 0 26d
$ kubectl get pods -l 'app=zookeeper'
NAME READY STATUS RESTARTS AGE
kafka-1-zookeeper-0 1/1 Running 0 26d
kafka-1-zookeeper-1 1/1 Running 0 26d
kafka-1-zookeeper-2 1/1 Running 0 26d
$ kubectl get pods -l 'app=kafka-exporter'
NAME READY STATUS RESTARTS AGE
kafka-1-exporter-88786d84b-z954z 1/1 Running 5 26d
kubectl describe pods kafka-1-0
Name: kafka-1-0
Namespace: default
Priority: 0
Node: gke-chechiachang-pool-1-e4622744-wcq0/10.140.15.212
Labels: app=kafka
controller-revision-hash=kafka-1-69986d7477
release=kafka-1
statefulset.kubernetes.io/pod-name=kafka-1-0
Annotations: kubernetes.io/limit-ranger: LimitRanger plugin set: cpu request for container kafka-broker
Status: Running
IP: 10.12.6.178
Controlled By: StatefulSet/kafka-1
Containers:
kafka-broker:
Image: confluentinc/cp-kafka:5.0.1
Port: 9092/TCP
Host Port: 0/TCP
Command:
sh
-exc
unset KAFKA_PORT && \
export KAFKA_BROKER_ID=${POD_NAME##*-} && \
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092 && \
exec /etc/confluent/docker/run
Requests:
cpu: 100m
Liveness: exec [sh -ec /usr/bin/jps | /bin/grep -q SupportedKafka] delay=30s timeout=5s period=10s #success=1 #failure=3
Readiness: tcp-socket :kafka delay=30s timeout=5s period=10s #success=1 #failure=3
Environment:
POD_IP: (v1:status.podIP)
POD_NAME: kafka-1-0 (v1:metadata.name)
POD_NAMESPACE: default (v1:metadata.namespace)
KAFKA_HEAP_OPTS: -Xmx4G -Xms1G
KAFKA_ZOOKEEPER_CONNECT: kafka-1-zookeeper:2181
KAFKA_LOG_DIRS: /opt/kafka/data/logs
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: false
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MESSAGE_MAX_BYTES: 16000000
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 5555
Mounts:
/opt/kafka/data from datadir (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-2tm8c (ro)
Conditions:
Volumes:
datadir:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: datadir-kafka-1-0
ReadOnly: false
default-token-2tm8c:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-2tm8c
Optional: false
講幾個重點:
看一下 service 與 endpoints
zookeeper 與 exporter 我們這邊先掠過不談,到專章講高可用性與服務監測時,再來討論。
$ kubectl get service -l 'app=kafka'
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-1 ClusterIP 10.15.242.178 <none> 9092/TCP 26d
kafka-1-headless ClusterIP None <none> 9092/TCP 26d
兩個 services
簡單來說,kafka broker 會做 auto service discovery,我們可以使用 headless service。
客戶端(consumer & producer) 連入時,則使用 cluster-ip service,做 load balancing。
$ kubectl get endpoints -l 'app=kafka'
NAME ENDPOINTS AGE
kafka-1 10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092 26d
kafka-1-headless 10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092 26d
附上簡單的 Golang 客戶端,完整 Github Repository 在這邊
package main
import (
"context"
"fmt"
"strconv"
"time"
"github.com/segmentio/kafka-go" // 使用的套件
)
func main() {
topic := "ticker" // 指定 message 要使用的 topic
partition := 0 // 指定 partition,由於底下連線指定連線到 partition 的 leader,所以需要指定 partition
kafkaURL := "kafka-0:9092" // 指定 kafkaURL,也可以透過 os.GetEnv() 從環境變數裡拿到。
// producer 對指定 topic, partition 的 leader 產生連線
producerConn, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
// 程式結束最後把 connection 關掉。不關會造成 broker 累積大量 connection,需要等待 broker 端 timeout 才會釋放。
defer producerConn.Close()
//producerConn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 使用 go routine 跑一個 subprocess for loop,一直產生 message 到 kafka topic,這邊的範例是每秒推一個秒數。
go func() {
for {
producerConn.WriteMessages(
kafka.Message{
Value: []byte(strconv.Itoa(time.Now().Second())),
},
)
time.Sleep(1 * time.Second)
}
}()
// make a new reader that consumes from topic-A, partition 0
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Partition: 0,
MinBytes: 10e2, // 1KB
MaxBytes: 10e3, // 10KB
})
defer r.Close()
//r.SetOffset(42)
// 印出 reader 收到的 message
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("%v message at offset %d: %s = %s\n", time.Now(), m.Offset, string(m.Key), string(m.Value))
}
}
這邊可以使用 Dockerfile 包成一個 container image,然後丟上 kubernetes
我稍晚補一下 docker image 跟 deployment 方便大家操作好了。
或是懶人測試,直接 kubectl run 一個 golang base image 讓它 sleep,然後再連進去
kubectl run DEPLOYMENT_NAME --image=golang:1.13.0-alpine3.10 sleep 3600
kubectl exec -it POD_NAME sh
# 裡面沒有 Git 跟 vim 裝一下
apk add git vim
go get github.com/chechiachang/kafka-on-kubernetes
cd src/github.com/chechiachang/kafka-on-kubernetes/
vim main.go
go build .
./kafka-on-kubernetes
2019-09-24 14:20:46.872554693 +0000 UTC m=+9.154112787 message at offset 1: = 46
2019-09-24 14:20:47.872563087 +0000 UTC m=+9.154121166 message at offset 2: = 47
2019-09-24 14:20:48.872568848 +0000 UTC m=+9.154126926 message at offset 3: = 48
2019-09-24 14:20:49.872574499 +0000 UTC m=+9.154132576 message at offset 4: = 49
2019-09-24 14:20:50.872579957 +0000 UTC m=+9.154138032 message at offset 5: = 50
2019-09-24 14:20:51.872588823 +0000 UTC m=+9.154146892 message at offset 6: = 51
2019-09-24 14:20:52.872594672 +0000 UTC m=+9.154152748 message at offset 7: = 52
2019-09-24 14:20:53.872599986 +0000 UTC m=+9.154158060 message at offset 8: = 53
這樣就連上了,完成一個最簡單的使用範例。
這個例子太過簡單,上一篇講的 consumer group, partitions, offset 什麼設定全都沒用上。實務上這些都需要好好思考,並且依
據需求做調整設定。
把測試用的 deployment 幹掉
kubectl delete deployment DEPLOYMENT_NAME
或是攋人測試,直接 kubectl run 一個 golang base image 讓它 sleep,然後在連進去。
攋人 -> 懶人。
為何要做 sleep 3600 才能執行 exec 進去該 POD。
感謝協助修正~
k8s 跑一個 Pod,通常會指定一個 entrypoint,作為這個 Pod 的 main process。例如我起一個 API server,他的 main process 就是 serve 某個 port。
k8s 預設會監測這個 main process 作為 liveness check,如果 process 斷了,k8s 會判斷 main process 出問題,依照預設設定重啟整個 Pod。例如 API server main process 死了,可能就會想要自動重啟 Pod。
我們今天開這個 golang Pod,只需要 golang library,不希望裡面已經有 main process,或是因為 main process healthcheck 失敗讓我們的 Pod 被重啟,只是希望這個 Pod 一直開啟著讓我們開發或除錯,所以讓他一直睡。
這跟使用 Busybox在 K8s debug 時的用法一樣。
當客戶端在 k8s 外也能連上嗎?
這個範例不行
還需補上 ingress ,或是更改 service 成為 load balancer,去接 gcp load balancer
然後把 gcp node-level 的 firewall port 9092 打開
了解!!
看到你说两个service的不同点,我突然想到一个问题。
假如在集群里新建两个namespace,frontend和backend,分别创建一个ABC front和一个ABC back服务,用headless service。
就是这样的结果
frontend: ABC front
backend: ABC back
我好奇的是: ABC front的代码里,请求ABC back的url地址,直接写上ABC back的pod service name,这样ABC front的请求可以到达ABC back吗?
类似这样get ABC_back/api/GetUserList
另外如果不用pod service name,而是用ingress直接写域名,域名dns server也是在公司内部,那么这个流量会怎么走?
我猜大概是这样,有错欢迎指出
kube-proxy >> flannel cni >> docker bridge >> dns server gateway >> 然后回流 >> docker bridge >> flannel cni >> kube-proxy
是不是相比用headless service时间更长?